package com.xueyuan.wata.daph.core.execution

import com.xueyuan.wata.daph.api.exception.DaphException
import com.xueyuan.wata.daph.api.executor.AbstractPoolExecutor
import com.xueyuan.wata.daph.core.execution.task.Task

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}

class BaseFutureExecutor(parallelism: Int) extends AbstractPoolExecutor(parallelism) {
  override protected def nameFormat: String = "BaseFutureExecutor-pool-%d"
  implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(pool)
  
  override def execute(): Unit = {
    logger.info("FutureBaseExecutor开始执行")

    val submittedTasks = mutable.HashSet[Task]()
    while (!getStatus) {
      val haveFailed = tasks.exists(_.context.isFailed)
      val allSucceed = tasks.forall(_.context.isSucceed)
      if (haveFailed | allSucceed) {
        setStatus(true)
      } else {
        val unSubmittedTasks = tasks.diff(submittedTasks)
        val succeedTasksInSubmitted = submittedTasks.filter(_.context.isSucceed)
        val unSubmittedTasksAllUpSucceed = unSubmittedTasks.filter { task =>
          _dag.getAllUpTasks(task).forall(succeedTasksInSubmitted.contains)
        }
        unSubmittedTasksAllUpSucceed.foreach { task =>
          if (!task.context.isSubmitted) {
            logger.info(s"Submit $task")
            Future {
              task.run()
            }

            task.context.markTaskSubmitted()
            logger.info(s"$this submitted")

            submittedTasks.add(task)
          }
        }
      }
    }

    val tasksFailed = tasks.filter(_.context.isFailed)
    if (tasksFailed.isEmpty) {
      logger.info("FutureBaseExecutor执行成功")
    } else {
      tasksFailed.foreach { task =>
        val te = task.context.getException.get
        logger.error("Task运行失败", te)
      }
      throw new DaphException(s"FutureBaseExecutor执行失败")
    }
  }

  override def stop(): Unit = {
    pool.shutdownNow()
  }
}
